home *** CD-ROM | disk | FTP | other *** search
/ Language/OS - Multiplatform Resource Library / LANGUAGE OS.iso / p4 / p4-1_2a.lha / p4-1.2a / lib / p4_tsr.c < prev    next >
C/C++ Source or Header  |  1992-12-15  |  13KB  |  518 lines

  1. #include "p4.h"
  2. #include "p4_sys.h"
  3.  
  4. /*
  5.  * search_p4_queue tries to locate a message of the desired type in the
  6.  * local queue of messages already received.  If it finds one, it dequeues it 
  7.  * if deq is true, and returns its address; otherwise it returns NULL.
  8.  */
  9. struct p4_msg *search_p4_queue(req_type, req_from, deq)
  10. int req_type, req_from;
  11. P4BOOL deq;
  12. {
  13.     struct p4_queued_msg *qpp, *qp;
  14.     struct p4_msg *tqp;
  15.     P4BOOL found;
  16.  
  17.     tqp = NULL;
  18.     qpp = NULL;
  19.     found = FALSE;
  20.     qp = p4_local->queued_messages->first_msg;
  21.  
  22.     while (qp && !found)
  23.     {
  24.     if (((qp->qmsg->type == req_type) || (req_type == -1)) &&
  25.         ((qp->qmsg->from == req_from) || (req_from == -1)))
  26.     {
  27.         found = TRUE;
  28.         if (deq)
  29.         {
  30.         if (p4_local->queued_messages->first_msg ==
  31.             p4_local->queued_messages->last_msg)
  32.         {
  33.             p4_local->queued_messages->first_msg = NULL;
  34.             p4_local->queued_messages->last_msg = NULL;
  35.         }
  36.         else
  37.         {
  38.             if (qp == p4_local->queued_messages->first_msg)
  39.             {
  40.             p4_local->queued_messages->first_msg = qp->next;
  41.             }
  42.             else if (qp == p4_local->queued_messages->last_msg)
  43.             {
  44.             p4_local->queued_messages->last_msg = qpp;
  45.             qpp->next = NULL;
  46.             }
  47.             else
  48.             {
  49.             qpp->next = qp->next;
  50.             }
  51.         }
  52.         }
  53.     }
  54.     else
  55.     {
  56.         qpp = qp;
  57.         qp = qp->next;
  58.     }
  59.     }
  60.     if (found)
  61.     {
  62.     p4_dprintfl(30,"extracted queued msg of type %d from %d\n",
  63.             qp->qmsg->type,qp->qmsg->from);
  64.     tqp = qp->qmsg;
  65.     if (deq)
  66.     {
  67.         free_quel(qp);
  68.     }
  69.     }
  70.     return (tqp);
  71. }
  72.  
  73. /*
  74.  * This is the top-level receive routine, called by the user.
  75.  *   req_type is either a desired type or -1.  In the -1 case it will be
  76.  *        modified  by p4_recv to indicate the type actually received.
  77.  *   req_from is either a desired source or -1.  In the -1 case it will be
  78.  *        modified by p4_recv to the source of the message actually received.
  79.  *   msg will be set by p4_recv to point to a buffer containing the message.
  80.  *   len_rcvd will be set by p4_recv to contain the length of the message.
  81.  *
  82.  *   returns 0 if successful; non-zero if error
  83.  */
  84. int p4_recv(req_type, req_from, msg, len_rcvd)
  85. int *req_type, *req_from, *len_rcvd;
  86. char **msg;
  87. {
  88.     struct p4_msg *tmsg;
  89.     P4BOOL good;
  90.  
  91.     p4_dprintfl(20, "receiving for type = %d, sender = %d\n",
  92.         *req_type, *req_from);
  93.     ALOG_LOG(p4_local->my_id,END_USER,0,"");
  94.     ALOG_LOG(p4_local->my_id,BEGIN_RECV,*req_from,"");
  95.     if (!(tmsg = search_p4_queue(*req_type, *req_from, 1)))
  96.     {
  97.     for (good = FALSE; !good;)
  98.     {
  99.             ALOG_LOG(p4_local->my_id,END_RECV,0,"");
  100.             ALOG_LOG(p4_local->my_id,BEGIN_WAIT,0,"");
  101.             tmsg = recv_message(req_type, req_from);
  102.             ALOG_LOG(p4_local->my_id,END_WAIT,0,"");
  103.             ALOG_LOG(p4_local->my_id,BEGIN_RECV,0,"");
  104.         if (tmsg == NULL)
  105.         {
  106.         p4_dprintf("p4_recv: could not receive a message\n");
  107.         return (-1);
  108.         }
  109.         if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  110.         ((tmsg->from == *req_from) || (*req_from == -1)))
  111.         {
  112.         good = TRUE;
  113.         }
  114.         else
  115.         {
  116.         queue_p4_message(tmsg, p4_local->queued_messages) ;
  117.         }
  118.     }
  119.     }
  120.  
  121.     if (tmsg->ack_req & P4_BROADCAST_MASK)
  122.     {
  123.     if (subtree_broadcast_p4(tmsg->type, tmsg->from,(char *) &tmsg->msg,
  124.                  tmsg->len, tmsg->data_type))
  125.     {
  126.         p4_dprintf("p4_recv: subtree_brdcst failed\n");
  127.         return -1;
  128.     }
  129.     tmsg->ack_req &= ~P4_BROADCAST_MASK;    /* Unset broadcast bit */
  130.     if (tmsg->from == p4_get_my_id())
  131.         free_p4_msg(tmsg);    /* Don't want broadcast from self */
  132.     }
  133.  
  134.     *req_type = tmsg->type;
  135.     *req_from = tmsg->from;
  136.     p4_dprintfl(10, "received type=%d, from=%d\n",*req_type,*req_from);
  137.     if (*msg == NULL)
  138.     {
  139.     *msg = (char *) &(tmsg->msg);
  140.     *len_rcvd = tmsg->len;
  141.     }
  142.     else
  143.     {
  144.     /* copy into the user's buffer area, truncating if necessary */
  145.     if (tmsg->len < *len_rcvd)
  146.         *len_rcvd = tmsg->len;
  147.     bcopy((char *) &(tmsg->msg),*msg,*len_rcvd);
  148.     free_p4_msg(tmsg);
  149.     }
  150.     ALOG_LOG(p4_local->my_id,END_RECV,*req_from,"");
  151.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  152.  
  153.     return (0);
  154. }
  155.  
  156. struct p4_msg *recv_message(req_type,req_from)
  157. int *req_type, *req_from;
  158. {
  159.     int rc, len;
  160.     struct p4_msg *tmsg;
  161.  
  162. #if  defined(CAN_DO_SOCKET_MSGS) && \
  163.     !defined(CAN_DO_SHMEM_MSGS)  && \
  164.     !defined(CAN_DO_CUBE_MSGS)   && \
  165.     !defined(CAN_DO_SWITCH_MSGS)
  166.  
  167.     return (socket_recv());
  168.  
  169. #else
  170.  
  171.     while (TRUE)
  172.     {
  173. #       if defined(CAN_DO_SHMEM_MSGS)
  174.     if (shmem_msgs_available())
  175.     {
  176.         return (shmem_recv());
  177.     }
  178. #       endif
  179.  
  180. #       if defined(CAN_DO_SOCKET_MSGS)
  181.     if (socket_msgs_available())
  182.     {
  183.         return (socket_recv());
  184.     }
  185. #       endif
  186.  
  187. #       if defined(CAN_DO_CUBE_MSGS)
  188.     if (MD_cube_msgs_available())
  189.         return (MD_cube_recv());
  190. #       endif
  191.  
  192. #       if defined(CAN_DO_SWITCH_MSGS)
  193.     if (p4_global->proctable[p4_local->my_id].switch_port != -1)
  194.     {
  195.         if (rc = sw_probe(req_from, p4_local->my_id, req_type, &len))
  196.         {
  197.         tmsg = alloc_p4_msg(len - sizeof(struct p4_msg) + sizeof(char *));
  198.         sw_recv(rc, tmsg);
  199.         p4_dprintfl(10, "p4_recv: received message from switch\n");
  200.         return (tmsg);
  201.         }
  202.     }
  203. #       endif
  204. #       if defined(CAN_DO_TCMP_MSGS)
  205.     if (MD_tcmp_msgs_available(req_type,req_from))
  206.     {
  207.         return (MD_tcmp_recv());
  208.     }
  209. #       endif
  210.     }
  211.  
  212. #endif
  213. }
  214.  
  215. P4BOOL p4_messages_available(req_type, req_from)
  216. int *req_type, *req_from;
  217. {
  218.     int found, len;
  219.     struct p4_msg *tmsg;
  220.  
  221.     ALOG_LOG(p4_local->my_id,END_USER,0,"");
  222.     ALOG_LOG(p4_local->my_id,BEGIN_WAIT,1,"");
  223.  
  224.     found = FALSE;
  225.     if (tmsg = search_p4_queue(*req_type, *req_from, 0))
  226.     {
  227.     found = TRUE;
  228.     *req_type = tmsg->type;
  229.     *req_from = tmsg->from;
  230.     }
  231.  
  232. #   if defined(CAN_DO_SHMEM_MSGS)
  233.     while (!found && shmem_msgs_available())
  234.     {
  235.     tmsg = shmem_recv();
  236.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  237.         ((tmsg->from == *req_from) || (*req_from == -1)))
  238.     {
  239.         found = TRUE;
  240.         *req_type = tmsg->type;
  241.         *req_from = tmsg->from;
  242.     }
  243.     queue_p4_message(tmsg, p4_local->queued_messages);
  244.     }
  245. #   endif
  246.  
  247. #   if defined(CAN_DO_SOCKET_MSGS)
  248.     while (!found && socket_msgs_available())
  249.     {
  250.     tmsg = socket_recv();
  251.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  252.         ((tmsg->from == *req_from) || (*req_from == -1)))
  253.     {
  254.         found = TRUE;
  255.         *req_type = tmsg->type;
  256.         *req_from = tmsg->from;
  257.     }
  258.     queue_p4_message(tmsg, p4_local->queued_messages);
  259.     }
  260. #   endif
  261.  
  262. #   if defined(CAN_DO_CUBE_MSGS)
  263.     while (!found && MD_cube_msgs_available())
  264.     {
  265.     tmsg = MD_cube_recv();
  266.     if (((tmsg->type == *req_type) || (*req_type == -1)) &&
  267.         ((tmsg->from == *req_from) || (*req_from == -1)))
  268.     {
  269.         found = TRUE;
  270.         *req_type = tmsg->type;
  271.         *req_from = tmsg->from;
  272.     }
  273.     queue_p4_message(tmsg, p4_local->queued_messages);
  274.     }
  275. #   endif
  276.  
  277.  
  278. #if defined(CAN_DO_SWITCH_MSGS)
  279.     if (!found && (p4_global->proctable[p4_local->my_id].switch_port != -1))
  280.     {
  281.     if (sw_probe(req_from, p4_local->my_id, req_type, &len))
  282.         found = TRUE;
  283.     }
  284. #endif
  285.  
  286. #if defined(CAN_DO_TCMP_MSGS)
  287.     if (!found && MD_tcmp_msgs_available(req_from,req_type))
  288.     found = TRUE;
  289. #endif
  290.  
  291.     ALOG_LOG(p4_local->my_id,END_WAIT,1,"");
  292.     ALOG_LOG(p4_local->my_id,BEGIN_USER,0,"");
  293.  
  294.     return (found);
  295. }                /* p4_messages_available */
  296.  
  297. P4VOID queue_p4_message(msg, hdr)
  298. struct p4_msg *msg;
  299. struct p4_msg_queue *hdr;
  300. {
  301.     struct p4_queued_msg *q;
  302.  
  303.     q = alloc_quel();
  304.     q->qmsg = msg;
  305.     q->next = NULL;
  306.  
  307.     if (hdr->first_msg == NULL)
  308.     {
  309.     hdr->first_msg = q;
  310.     }
  311.     else
  312.     {
  313.     hdr->last_msg->next = q;
  314.     }
  315.     hdr->last_msg = q;
  316.     p4_dprintfl(30,"queued type %d message for process %d quel=%d\n",
  317.         msg->type,msg->to,q);
  318. }
  319.  
  320.  
  321. int send_message(type, from, to, msg, len, data_type, ack_req, p4_buff_ind)
  322. char *msg;
  323. int type, to, len, data_type;
  324. P4BOOL ack_req, p4_buff_ind;
  325. {
  326.     struct p4_msg *tmsg;
  327.     int conntype = p4_local->conntab[to].type;
  328.  
  329.     p4_dprintfl(90, "send_message: to = %d, conntype=%d conntype=%s\n",
  330.         to, conntype, print_conn_type(conntype));
  331.  
  332.     switch (conntype)
  333.     {
  334.       case CONN_ME:
  335.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  336.     p4_dprintfl(20, "sending msg of type %d to myself\n",type);
  337.     queue_p4_message(tmsg, p4_local->queued_messages);
  338.     p4_dprintfl(10, "sent msg of type %d to myself\n",type);
  339.     break;
  340.  
  341. #ifdef CAN_DO_SHMEM_MSGS
  342.       case CONN_SHMEM:
  343.     tmsg = get_tmsg(type, from, to, msg, len, data_type, 
  344.                         ack_req, p4_buff_ind);
  345.     shmem_send(tmsg);
  346.     break;
  347. #endif
  348.  
  349. #ifdef CAN_DO_CUBE_MSGS
  350.       case CONN_CUBE:
  351.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  352.     MD_cube_send(tmsg);
  353.     if (!p4_buff_ind)
  354.         free_p4_msg(tmsg);
  355.     break;
  356. #endif
  357.  
  358. #ifdef CAN_DO_SOCKET_MSGS
  359.       case CONN_REMOTE_NON_EST:
  360.     if (establish_connection(to))
  361.     {
  362.         p4_dprintfl(90, "send_message: conn just estabd to %d\n", to);
  363.     }
  364.     else
  365.     {
  366.         p4_dprintf("send_message: unable to estab conn to %d\n", to);
  367.         return (-1);
  368.     }
  369.     /* no break; - just fall into connected code */
  370.       case CONN_REMOTE_EST:
  371.     if (data_type == P4NOX || p4_local->conntab[to].same_data_rep)
  372.     {
  373.         socket_send(type, from, to, msg, len, data_type, ack_req);
  374.     }
  375.     else
  376.     {
  377. #           ifdef CAN_DO_XDR
  378.         xdr_send(type, from, to, msg, len, data_type, ack_req);
  379. #           else
  380.         p4_error("cannot do xdr sends\n",0);
  381. #           endif
  382.     }
  383.     break;
  384. #endif
  385.  
  386. #if defined(CAN_DO_SWITCH_MSGS)
  387.       case CONN_REMOTE_SWITCH:
  388.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  389.     p4_dprintfl(20, "sending msg of type %d from %d to %d via switch_port %d\n",
  390.                 tmsg->type,tmsg->from,to,p4_local->conntab[tmsg->to].switch_port,tmsg);
  391.     sw_send(from, to,
  392.         p4_local->conntab[tmsg->to].switch_port, tmsg,
  393.         tmsg->len + sizeof(struct p4_msg) - sizeof(char *),
  394.         type);
  395.     p4_dprintfl(10, "sent msg of type %d from %d to %d via switch_port %d\n",
  396.                 tmsg->type,tmsg->from,to,p4_local->conntab[tmsg->to].switch_port,tmsg);
  397.     if (!p4_buff_ind)
  398.         free_p4_msg(tmsg);
  399.     break;
  400. #endif
  401.  
  402. #if defined(CAN_DO_TCMP_MSGS)
  403.       case CONN_TCMP:
  404.     tmsg = get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind);
  405.     p4_dprintfl(20, "sending msg of type %d to %d via tcmp\n",type,to);
  406.     MD_tcmp_send(type, from, to, tmsg, 
  407.              tmsg->len + sizeof(struct p4_msg) - sizeof(char *),
  408.              data_type, ack_req);
  409.     p4_dprintfl(10, "sent msg of type %d to %d via tcmp\n",type,to);
  410.     break;
  411. #endif
  412.  
  413.       case CONN_REMOTE_DYING:
  414.     p4_dprintfl(90, "send_message: proc %d is dying\n", to);
  415.     return (-1);
  416.  
  417.       default:
  418.     p4_dprintf("send_message: to=%d; invalid conn type=%d\n",to,conntype);
  419.     return (-1);
  420.     }
  421.  
  422.     return (0);
  423. }                /* send_message */
  424.  
  425. struct p4_msg *get_tmsg(type,from,to,msg,len,data_type,ack_req,p4_buff_ind)
  426. char *msg;
  427. int type, from, to, len, data_type, ack_req, p4_buff_ind;
  428. {
  429.     int i;
  430.     struct p4_msg *tmsg;
  431.  
  432.     if (p4_buff_ind)
  433.     {
  434.     tmsg = (struct p4_msg *) (msg - sizeof(struct p4_msg) + sizeof(char *));
  435.     }
  436.     else
  437.     {
  438.         tmsg = alloc_p4_msg(len);
  439.     if (tmsg == NULL)
  440.     {
  441.         p4_dprintf("OOPS! get_tmsg: could not alloc buff **\n");
  442.         return (NULL);
  443.     }
  444.     bcopy(msg, (char *) &(tmsg->msg), len);
  445.     }
  446.     tmsg->type = type;
  447.     tmsg->from = from;
  448.     tmsg->to = to;
  449.     tmsg->len = len;
  450.     tmsg->ack_req = ack_req;
  451.     tmsg->data_type = data_type;
  452.     return (tmsg);
  453. }
  454.  
  455.  
  456. char *p4_msg_alloc(msglen)
  457. int msglen;
  458. {
  459.     char *t;
  460.  
  461.     t = (char *) alloc_p4_msg(msglen);
  462.     ((struct p4_msg *) t)->msg_id = -1;    /* msg not in use by IPSC isend */
  463.     t = t + sizeof(struct p4_msg) - sizeof(char *);
  464.     return(t);
  465. }
  466.  
  467. P4VOID p4_msg_free(m)
  468. char *m;
  469. {
  470.     char *t;
  471.  
  472.     t = m - sizeof(struct p4_msg) + sizeof(char *);
  473.     ((struct p4_msg *) t)->msg_id = -1;    /* msg not in use by IPSC isend */
  474.     free_p4_msg((struct p4_msg *) t);
  475. }
  476.  
  477.  
  478. P4VOID initialize_msg_queue(mq)
  479. struct p4_msg_queue *mq;
  480. {
  481.     mq->first_msg = NULL;
  482.     mq->last_msg = NULL;
  483.     (P4VOID) p4_moninit(&(mq->m), 1);
  484.     p4_lock_init(&(mq->ack_lock));
  485.     p4_lock(&(mq->ack_lock));
  486. }
  487.  
  488.  
  489. struct p4_queued_msg *alloc_quel()
  490. {
  491.     struct p4_queued_msg *q;
  492.  
  493.     p4_lock(&p4_global->avail_quel_lock);
  494.     if (p4_global->avail_quel == NULL)
  495.     {
  496.     q = (struct p4_queued_msg *) p4_shmalloc(sizeof(struct p4_queued_msg));
  497.     if (!q)
  498.         p4_error("alloc_quel:  could not allocate queue element",
  499.              sizeof(struct p4_queued_msg));
  500.     }
  501.     else
  502.     {
  503.     q = p4_global->avail_quel;
  504.     p4_global->avail_quel = q->next;
  505.     }
  506.     p4_unlock(&p4_global->avail_quel_lock);
  507.     return (q);
  508. }
  509.  
  510. P4VOID free_quel(q)
  511. struct p4_queued_msg *q;
  512. {
  513.     p4_lock(&p4_global->avail_quel_lock);
  514.     q->next = p4_global->avail_quel;
  515.     p4_global->avail_quel = q;
  516.     p4_unlock(&p4_global->avail_quel_lock);
  517. }
  518.